/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.db.mpp.plan.planner.distribution;

import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.mpp.plan.planner.plan.*;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;

import java.util.List;

public class DistributionPlanner {
    private Analysis analysis;
    private MPPQueryContext context;
    private LogicalQueryPlan logicalPlan;

    private int planFragmentIndex = 0;

    public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
        this.analysis = analysis;
        this.logicalPlan = logicalPlan;
        this.context = logicalPlan.getContext();
    }

    public PlanNode rewriteSource() {
        SourceRewriter rewriter = new SourceRewriter(this.analysis);
        List<PlanNode> planNodeList =
                rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
        if (planNodeList.size() != 1) {
            throw new IllegalStateException("root node must return only one");
        } else {
            return planNodeList.get(0);
        }
    }

    public PlanNode addExchangeNode(PlanNode root) {
        ExchangeNodeAdder adder = new ExchangeNodeAdder(this.analysis);
        return adder.visit(root, new NodeGroupContext(context));
    }

    public SubPlan splitFragment(PlanNode root) {
        FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
        return fragmentBuilder.splitToSubPlan(root);
    }

    public DistributedQueryPlan planFragments() {
        PlanNode rootAfterRewrite = rewriteSource();
        PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
        if (analysis.getStatement() instanceof QueryStatement
                || analysis.getStatement() instanceof ShowQueriesStatement) {
            analysis
                    .getRespDatasetHeader()
                    .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
        }
        SubPlan subPlan = splitFragment(rootWithExchange);
        // Mark the root Fragment of root SubPlan as `root`
        subPlan.getPlanFragment().setRoot(true);
        List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
        // Only execute this step for READ operation
        if (context.getQueryType() == QueryType.READ) {
            SetSinkForRootInstance(subPlan, fragmentInstances);
        }
        return new DistributedQueryPlan(
                logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
    }

    // Convert fragment to detailed instance
    // And for parallel-able fragment, clone it into several instances with different params.
    public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
        IFragmentParallelPlaner parallelPlaner =
                context.getQueryType() == QueryType.READ
                        ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
                        : new WriteFragmentParallelPlanner(subPlan, analysis, context);
        return parallelPlaner.parallelPlan();
    }

    // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
    public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
        FragmentInstance rootInstance = null;
        for (FragmentInstance instance : instances) {
            if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
                rootInstance = instance;
                break;
            }
        }
        // root should not be null during normal process
        if (rootInstance == null) {
            return;
        }

        FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
        sinkNode.setDownStream(
                context.getLocalDataBlockEndpoint(),
                context.getResultNodeContext().getVirtualFragmentInstanceId(),
                context.getResultNodeContext().getVirtualResultNodeId());
        sinkNode.setChild(rootInstance.getFragment().getPlanNodeTree());
        context
                .getResultNodeContext()
                .setUpStream(
                        rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
                        rootInstance.getId(),
                        sinkNode.getPlanNodeId());
        rootInstance.getFragment().setPlanNodeTree(sinkNode);
    }

    private PlanFragmentId getNextFragmentId() {
        return this.logicalPlan.getContext().getQueryId().genPlanFragmentId();
    }

    private class FragmentBuilder {
        private MPPQueryContext context;

        public FragmentBuilder(MPPQueryContext context) {
            this.context = context;
        }

        public SubPlan splitToSubPlan(PlanNode root) {
            SubPlan rootSubPlan = createSubPlan(root);
            splitToSubPlan(root, rootSubPlan);
            return rootSubPlan;
        }

        private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
            // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
            if (root instanceof WritePlanNode) {
                return;
            }
            if (root instanceof ExchangeNode) {
                // We add a FragmentSinkNode for newly created PlanFragment
                ExchangeNode exchangeNode = (ExchangeNode) root;
                FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
                sinkNode.setChild(exchangeNode.getChild());
                sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());

                // Record the source node info in the ExchangeNode so that we can keep the connection of
                // these nodes/fragments
                exchangeNode.setRemoteSourceNode(sinkNode);
                // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
                exchangeNode.cleanChildren();

                // Build the child SubPlan Tree
                SubPlan childSubPlan = createSubPlan(sinkNode);
                splitToSubPlan(sinkNode, childSubPlan);

                subPlan.addChild(childSubPlan);
                return;
            }
            for (PlanNode child : root.getChildren()) {
                splitToSubPlan(child, subPlan);
            }
        }

        private SubPlan createSubPlan(PlanNode root) {
            PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
            return new SubPlan(fragment);
        }
    }
}
